1
//--------------------------------------------------------------------------
3 // Copyright (c) Microsoft Corporation. All rights reserved.
5 // File: AsyncReaderWriter.cs
7 //--------------------------------------------------------------------------
9 using System
.Collections
.Generic
;
10 using System
.Threading
.Tasks
;
11 using System
.Diagnostics
;
13 namespace System
.Threading
.Async
15 /// <summary>Provides for asynchronous exclusive and concurrent execution support.</summary>
16 [DebuggerDisplay("WaitingConcurrent={WaitingConcurrent}, WaitingExclusive={WaitingExclusive}, CurrentReaders={CurrentConcurrent}, Exclusive={CurrentlyExclusive}")]
17 public sealed class AsyncReaderWriter
19 /// <summary>The lock that protects all shared state in this instance.</summary>
20 private readonly object _lock
= new object();
21 /// <summary>The queue of concurrent readers waiting to execute.</summary>
22 private readonly Queue
<Task
> _waitingConcurrent
= new Queue
<Task
>();
23 /// <summary>The queue of exclusive writers waiting to execute.</summary>
24 private readonly Queue
<Task
> _waitingExclusive
= new Queue
<Task
>();
25 /// <summary>The number of concurrent readers currently executing.</summary>
26 private int _currentConcurrent
= 0;
27 /// <summary>The number of exclusive writers currently executing.</summary>
28 private bool _currentlyExclusive
= false;
29 /// <summary>The non-generic factory to use for task creation.</summary>
30 private TaskFactory _factory
;
32 /// <summary>Initializes the ReaderWriterAsync.</summary>
33 public AsyncReaderWriter() { _factory = Task.Factory; }
35 /// <summary>Initializes the ReaderWriterAsync with the specified TaskFactory for us in creating all tasks.</summary>
36 /// <param name="factory">The TaskFactory to use to create all tasks.</param>
37 public AsyncReaderWriter(TaskFactory factory
)
39 if (factory
== null) throw new ArgumentNullException("factory");
43 /// <summary>Gets the number of exclusive operations currently queued.</summary>
44 public int WaitingExclusive { get { lock (_lock) return _waitingExclusive.Count; }
}
45 /// <summary>Gets the number of concurrent operations currently queued.</summary>
46 public int WaitingConcurrent { get { lock (_lock) return _waitingConcurrent.Count; }
}
47 /// <summary>Gets the number of concurrent operations currently executing.</summary>
48 public int CurrentConcurrent { get { lock (_lock) return _currentConcurrent; }
}
49 /// <summary>Gets whether an exclusive operation is currently executing.</summary>
50 public bool CurrentlyExclusive { get { lock (_lock) return _currentlyExclusive; }
}
52 /// <summary>Queues an exclusive writer action to the ReaderWriterAsync.</summary>
53 /// <param name="action">The action to be executed exclusively.</param>
54 /// <returns>A Task that represents the execution of the provided action.</returns>
55 public Task
QueueExclusiveWriter(Action action
)
57 // Create the task. This Task will be started by the coordination primitive
58 // when it's safe to do so, e.g. when there are no other tasks associated
59 // with this async primitive executing.
60 var task
= _factory
.Create(state
=>
62 // Run the user-provided action
63 try { ((Action)state)(); }
64 // Ensure that we clean up when we're done
65 finally { FinishExclusiveWriter(); }
68 // Now that we've created the task, we need to do something with it, either queueing it or scheduling it immediately
71 // If there's already a task running, or if there are any other exclusive tasks that need to run,
72 // queue it. Otherwise, no one else is running or wants to run, so schedule it now.
73 if (_currentlyExclusive
|| _currentConcurrent
> 0 || _waitingExclusive
.Count
> 0) _waitingExclusive
.Enqueue(task
);
74 else RunExclusive_RequiresLock(task
);
77 // Return the created task for the caller to track.
81 /// <summary>Queues an exclusive writer function to the ReaderWriterAsync.</summary>
82 /// <param name="function">The function to be executed exclusively.</param>
83 /// <returns>A Task that represents the execution of the provided function.</returns>
84 public Task
<TResult
> QueueExclusiveWriter
<TResult
>(Func
<TResult
> function
)
86 // Create the task. This Task will be started by the coordination primitive
87 // when it's safe to do so, e.g. when there are no other tasks associated
88 // with this async primitive executing.
89 var task
= _factory
.Create(state
=>
91 // Run the user-provided function
92 try { return ((Func<TResult>)state)(); }
93 // Ensure that we clean up when we're done
94 finally { FinishExclusiveWriter(); }
97 // Now that we've created the task, we need to do something with it, either queueing it or scheduling it immediately
100 // If there's already a task running, or if there are any other exclusive tasks that need to run,
101 // queue it. Otherwise, no one else is running or wants to run, so schedule it now.
102 if (_currentlyExclusive
|| _currentConcurrent
> 0 || _waitingExclusive
.Count
> 0) _waitingExclusive
.Enqueue(task
);
103 else RunExclusive_RequiresLock(task
);
106 // Return the created task for the caller to track.
110 /// <summary>Queues a concurrent reader action to the ReaderWriterAsync.</summary>
111 /// <param name="action">The action to be executed concurrently.</param>
112 /// <returns>A Task that represents the execution of the provided action.</returns>
113 public Task
QueueConcurrentReader(Action action
)
115 // Create the task. This Task will be started by the coordination primitive
116 // when it's safe to do so, e.g. when there are no exclusive tasks running
117 // or waiting to run.
118 Task task
= _factory
.Create(state
=>
120 // Run the user-provided action
121 try { ((Action)state)(); }
122 // Ensure that we clean up when we're done
123 finally { FinishConcurrentReader(); }
126 // Now that we've created the task, we need to do something with it, either queueing it or scheduling it immediately
129 // If there are any exclusive tasks running or waiting, queue the concurrent task
130 if (_currentlyExclusive
|| _waitingExclusive
.Count
> 0) _waitingConcurrent
.Enqueue(task
);
131 // Otherwise schedule it immediately
132 else RunConcurrent_RequiresLock(task
);
135 // Return the task to the caller.
139 /// <summary>Queues a concurrent reader function to the ReaderWriterAsync.</summary>
140 /// <param name="function">The function to be executed concurrently.</param>
141 /// <returns>A Task that represents the execution of the provided function.</returns>
142 public Task
<TResult
> QueueConcurrentReader
<TResult
>(Func
<TResult
> function
)
144 // Create the task. This Task will be started by the coordination primitive
145 // when it's safe to do so, e.g. when there are no exclusive tasks running
146 // or waiting to run.
147 var task
= _factory
.Create(state
=>
149 // Run the user-provided function
150 try { return ((Func<TResult>)state)(); }
151 // Ensure that we clean up when we're done
152 finally { FinishConcurrentReader(); }
155 // Now that we've created the task, we need to do something with it, either queueing it or scheduling it immediately
158 // If there are any exclusive tasks running or waiting, queue the concurrent task
159 if (_currentlyExclusive
|| _waitingExclusive
.Count
> 0) _waitingConcurrent
.Enqueue(task
);
160 // Otherwise schedule it immediately
161 else RunConcurrent_RequiresLock(task
);
164 // Return the task to the caller.
168 /// <summary>Starts the specified exclusive task.</summary>
169 /// <param name="exclusive">The exclusive task to be started.</param>
170 /// <remarks>This must only be executed while holding the instance's lock.</remarks>
171 private void RunExclusive_RequiresLock(Task exclusive
)
173 _currentlyExclusive
= true;
174 exclusive
.Start(_factory
.GetTargetScheduler());
177 /// <summary>Starts the specified concurrent task.</summary>
178 /// <param name="concurrent">The exclusive task to be started.</param>
179 /// <remarks>This must only be executed while holding the instance's lock.</remarks>
180 private void RunConcurrent_RequiresLock(Task concurrent
)
182 _currentConcurrent
++;
183 concurrent
.Start(_factory
.GetTargetScheduler());
186 /// <summary>Starts all queued concurrent tasks.</summary>
187 /// <remarks>This must only be executed while holding the instance's lock.</remarks>
188 private void RunConcurrent_RequiresLock()
190 while (_waitingConcurrent
.Count
> 0) RunConcurrent_RequiresLock(_waitingConcurrent
.Dequeue());
193 /// <summary>Completes the processing of a concurrent reader.</summary>
194 private void FinishConcurrentReader()
198 // Update the tracking count of the number of concurrently executing tasks
199 _currentConcurrent
--;
201 // If we've now hit zero tasks running concurrently and there are any waiting writers, run one of them
202 if (_currentConcurrent
== 0 && _waitingExclusive
.Count
> 0) RunExclusive_RequiresLock(_waitingExclusive
.Dequeue());
204 // Otherwise, if there are no waiting writers but there are waiting readers for some reason (they should
205 // have started when they were added by the user), run all concurrent tasks waiting.
206 else if (_waitingExclusive
.Count
== 0 && _waitingConcurrent
.Count
> 0) RunConcurrent_RequiresLock();
210 /// <summary>Completes the processing of an exclusive writer.</summary>
211 private void FinishExclusiveWriter()
215 // We're no longer executing exclusively, though this might get reversed shortly
216 _currentlyExclusive
= false;
218 // If there are any more waiting exclusive tasks, run the next one in line
219 if (_waitingExclusive
.Count
> 0) RunExclusive_RequiresLock(_waitingExclusive
.Dequeue());
221 // Otherwise, if there are any waiting concurrent tasks, run them all
222 else if (_waitingConcurrent
.Count
> 0) RunConcurrent_RequiresLock();